/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.iterate;

import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.CDC_DATA_TABLE_DEF;
import static org.apache.phoenix.coprocessorclient.ScanRegionObserverConstants.WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.coprocessor.CDCGlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.UncoveredGlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.UncoveredLocalIndexRegionScanner;
import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerIndexUtil;

import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;

public abstract class RegionScannerFactory {

  protected RegionCoprocessorEnvironment env;

  /**
   * Returns the region based on the value of the region context
   */
  public Region getRegion() {
    return env.getRegion();
  }

  /**
   * Returns a processed region scanner based on the query conditions. Thie functionality is
   * abstracted out of the non-aggregate region observer class for better usage
   * @param scan input scan
   * @param s    input region scanner
   */
  public abstract RegionScanner getRegionScanner(final Scan scan, final RegionScanner s)
    throws Throwable;

  /**
   * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and re-throws as
   * DoNotRetryIOException to prevent needless retrying hanging the query for 30 seconds.
   * Unfortunately, until HBASE-7481 gets fixed, there's no way to do the same from a custom filter.
   * @param offset starting position in the rowkey.
   * @param tx     current transaction
   */
  public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env,
    final RegionScanner regionScanner, final Set<KeyValueColumnExpression> serverParsedKVRefs,
    final Expression[] serverParsedFuncRefs, final int offset, final Scan scan,
    final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
    final Region dataRegion, final IndexMaintainer indexMaintainer, PhoenixTransactionContext tx,
    final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet,
    final TupleProjector projector, final ImmutableBytesWritable ptr,
    final boolean useQualifierAsListIndex) throws IOException {
    return new RegionScanner() {
      private RegionScanner s = regionScanner;
      private RegionInfo regionInfo = env.getRegionInfo();
      private byte[] actualStartKey = getActualStartKey();
      private boolean useNewValueColumnQualifier =
        EncodedColumnsUtil.useNewValueColumnQualifier(scan);
      final long pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
      Expression extraWhere = null;
      long extraLimit = -1;

      {
        // for indexes construct the row filter for uncovered columns if it exists
        if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
          byte[] expBytes = scan.getAttribute(BaseScannerRegionObserverConstants.INDEX_FILTER);
          if (expBytes == null) {
            // For older clients
            expBytes = scan.getAttribute(BaseScannerRegionObserverConstants.LOCAL_INDEX_FILTER);
          }
          if (expBytes != null) {
            try {
              ByteArrayInputStream stream = new ByteArrayInputStream(expBytes);
              DataInputStream input = new DataInputStream(stream);
              extraWhere = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
              extraWhere.readFields(input);
            } catch (IOException io) {
              // should not happen since we're reading from a byte[]
              throw new RuntimeException(io);
            }
          }
          byte[] limitBytes = scan.getAttribute(BaseScannerRegionObserverConstants.INDEX_LIMIT);
          if (limitBytes == null) {
            // For older clients
            limitBytes = scan.getAttribute(BaseScannerRegionObserverConstants.LOCAL_INDEX_LIMIT);
          }
          if (limitBytes != null) {
            extraLimit = Bytes.toLong(limitBytes);
          }
          if (
            ScanUtil.isLocalOrUncoveredGlobalIndex(scan) && (tupleProjector != null
              || (indexMaintainer != null && indexMaintainer.isUncovered()))
          ) {

            PTable.ImmutableStorageScheme storageScheme = indexMaintainer.getIndexStorageScheme();
            Scan dataTableScan = new Scan();
            if (dataColumns != null) {
              for (int i = 0; i < dataColumns.length; i++) {
                if (storageScheme == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
                  dataTableScan.addFamily(dataColumns[i].getFamily());
                } else {
                  dataTableScan.addColumn(dataColumns[i].getFamily(),
                    dataColumns[i].getQualifier());
                }
              }
            } else if (indexMaintainer.isUncovered()) {
              // Indexed columns and the columns in index where clause should also be added
              // to the data columns to scan for uncovered global indexes. This is required
              // to verify the index row against the data table row.
              for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
                dataTableScan.addColumn(column.getFamily(), column.getQualifier());
              }
            }
            if (ScanUtil.isLocalIndex(scan)) {
              s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env,
                dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs,
                offset, actualStartKey, extraLimit);
            } else {
              if (scan.getAttribute(CDC_DATA_TABLE_DEF) != null) {
                s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
                  dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs,
                  extraLimit);
              } else {
                s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
                  dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs,
                  extraLimit);
              }
            }
          }
        }
      }

      // Get the actual scan start row of local index. This will be used to compare the row
      // key of the results less than scan start row when there are references.
      public byte[] getActualStartKey() {
        return ScanUtil.isLocalIndex(scan) ? ScanUtil.getActualStartRow(scan, regionInfo) : null;
      }

      @Override
      public boolean next(List<Cell> results) throws IOException {
        return next(results, null);
      }

      @Override
      public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
        try {
          boolean next =
            (scannerContext == null) ? s.next(results) : s.next(results, scannerContext);
          if (ScanUtil.isDummy(results)) {
            return true;
          }
          return next;
        } catch (Throwable t) {
          ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
          return false; // impossible
        }
      }

      @Override
      public void close() throws IOException {
        s.close();
      }

      @Override
      public RegionInfo getRegionInfo() {
        return s.getRegionInfo();
      }

      @Override
      public boolean isFilterDone() throws IOException {
        return s.isFilterDone();
      }

      @Override
      public boolean reseek(byte[] row) throws IOException {
        return s.reseek(row);
      }

      @Override
      public long getMvccReadPoint() {
        return s.getMvccReadPoint();
      }

      @Override
      public boolean nextRaw(List<Cell> result) throws IOException {
        return nextRaw(result, null);
      }

      @Override
      public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
        try {
          boolean next =
            (scannerContext == null) ? s.nextRaw(result) : s.nextRaw(result, scannerContext);
          if (ScanUtil.isDummy(result)) {
            return true;
          }
          if (result.size() == 0) {
            return next;
          }
          if ((ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
            if (ScanUtil.isLocalIndex(scan) && tupleProjector == null) {
              if (actualStartKey != null) {
                next = scanTillScanStartRow(s, result, null);
                if (result.isEmpty() || ScanUtil.isDummy(result)) {
                  return next;
                }
              }
              ServerIndexUtil.wrapResultUsingOffset(result, offset);
            }

            if (extraWhere != null) {
              Tuple merged = useQualifierAsListIndex
                ? new PositionBasedResultTuple(result)
                : new ResultTuple(Result.create(result));
              ImmutableBytesWritable ptr = new ImmutableBytesWritable();
              extraWhere.evaluate(merged, ptr);
              if (!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) {
                result.clear();
                return next;
              }
            }
          }
          Cell serverParsedResultCell = null;
          if (
            serverParsedFuncRefs != null && serverParsedFuncRefs.length > 0
              && serverParsedKVRefs.size() > 0
          ) {
            int resultPosition = replaceServerParsedExpressionElement(serverParsedKVRefs,
              serverParsedFuncRefs, result);
            serverParsedResultCell = result.get(resultPosition);
          }
          if (projector != null) {
            Tuple toProject = useQualifierAsListIndex
              ? new PositionBasedResultTuple(result)
              : new ResultTuple(Result.create(result));

            Pair<Tuple, byte[]> mergedTupleDynColsPair = getTupleWithDynColsIfRequired(result,
              projector.projectResults(toProject, useNewValueColumnQualifier));
            Tuple tupleWithDynColsIfReqd = mergedTupleDynColsPair.getFirst();
            byte[] serializedDynColsList = mergedTupleDynColsPair.getSecond();

            result.clear();
            result.add(tupleWithDynColsIfReqd.mergeWithDynColsListBytesAndGetValue(0,
              serializedDynColsList));
            if (serverParsedResultCell != null) {
              result.add(serverParsedResultCell);
            }
          }
          if (extraLimit >= 0 && --extraLimit == 0) {
            return false;
          }
          return next;
        } catch (Throwable t) {
          ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
          return false; // impossible
        }
      }

      /**
       * Iterate over the list of cells returned from the scan and use the dynamic column metadata
       * to create a tuple projector for dynamic columns. Finally, merge this with the projected
       * values corresponding to the known columns
       * @param result list of cells returned from the scan
       * @param tuple  projected value tuple from known schema/columns
       * @return A pair, whose first part is a combined projected value tuple containing the known
       *         column values along with resolved dynamic column values and whose second part is
       *         the serialized list of dynamic column PColumns. In case dynamic columns are not to
       *         be exposed or are not present, this returns the original tuple and an empty byte
       *         array.
       * @throws IOException Thrown if there is an error parsing protobuf or merging projected
       *                     values
       */
      private Pair<Tuple, byte[]> getTupleWithDynColsIfRequired(List<Cell> result, Tuple tuple)
        throws IOException {
        // We only care about dynamic column cells if the scan has this attribute set
        if (Bytes.equals(scan.getAttribute(WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS), TRUE_BYTES)) {
          List<PColumn> dynCols = new ArrayList<>();
          List<Cell> dynColCells = new ArrayList<>();
          TupleProjector dynColTupleProj =
            TupleProjector.getDynamicColumnsTupleProjector(result, dynCols, dynColCells);
          if (dynColTupleProj != null) {
            Tuple toProject = useQualifierAsListIndex
              ? new PositionBasedResultTuple(dynColCells)
              : new ResultTuple(Result.create(dynColCells));
            Tuple dynColsProjectedTuple =
              dynColTupleProj.projectResults(toProject, useNewValueColumnQualifier);

            ValueBitSet destBitSet = projector.getValueBitSet();
            // In case we are not projecting any non-row key columns, the field count for the
            // current projector will be 0, so we simply use the dynamic column projector's
            // value bitset as the destination bitset.
            if (projector.getSchema().getFieldCount() == 0) {
              destBitSet = dynColTupleProj.getValueBitSet();
            }
            // Add dynamic column data at the end of the projected tuple
            Tuple mergedTuple =
              TupleProjector.mergeProjectedValue((TupleProjector.ProjectedValueTuple) tuple,
                destBitSet, dynColsProjectedTuple, dynColTupleProj.getValueBitSet(),
                projector.getSchema().getFieldCount(), useNewValueColumnQualifier);

            // We send the serialized list of PColumns for dynamic columns back to the client
            // so that the client can process the corresponding projected values
            DynamicColumnMetaDataProtos.DynamicColumnMetaData.Builder dynColsListBuilder =
              DynamicColumnMetaDataProtos.DynamicColumnMetaData.newBuilder();
            for (PColumn dynCol : dynCols) {
              dynColsListBuilder.addDynamicColumns(PColumnImpl.toProto(dynCol));
            }
            return new Pair<>(mergedTuple, dynColsListBuilder.build().toByteArray());
          }
        }
        return new Pair<>(tuple, new byte[0]);
      }

      /**
       * When there is a merge in progress while scanning local indexes we might get the key values
       * less than scan start row. In that case we need to scan until get the row key more or equal
       * to scan start key. TODO try to fix this case in LocalIndexStoreFileScanner when there is a
       * merge.
       */
      private boolean scanTillScanStartRow(final RegionScanner s, List<Cell> result,
        ScannerContext scannerContext) throws IOException {
        boolean next = true;
        Cell firstCell = result.get(0);
        long startTime = EnvironmentEdgeManager.currentTimeMillis();
        while (
          Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
            firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0
        ) {
          if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
            byte[] rowKey = CellUtil.cloneRow(result.get(0));
            result.clear();
            ScanUtil.getDummyResult(rowKey, result);
            return true;
          }
          result.clear();
          if (scannerContext == null) {
            next = s.nextRaw(result);
          } else {
            next = s.nextRaw(result, scannerContext);
          }
          if (result.isEmpty()) {
            return next;
          }
          if (ScanUtil.isDummy(result)) {
            return true;
          }
          firstCell = result.get(0);
        }
        return next;
      }

      private int replaceServerParsedExpressionElement(
        final Set<KeyValueColumnExpression> serverParsedKVRefs,
        final Expression[] serverParsedFuncRefs, List<Cell> result) {
        // make a copy of the results array here, as we're modifying it below
        MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result));
        // The size of both the arrays would be same?
        // Using KeyValueSchema to set and retrieve the value
        // collect the first kv to get the row
        Cell rowKv = result.get(0);
        for (KeyValueColumnExpression kvExp : serverParsedKVRefs) {
          if (kvExp.evaluate(tuple, ptr)) {
            ListIterator<Cell> itr = result.listIterator();
            while (itr.hasNext()) {
              Cell kv = itr.next();
              if (
                Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
                  kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
                  && Bytes.equals(kvExp.getColumnQualifier(), 0, kvExp.getColumnQualifier().length,
                    kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())
              ) {
                // remove the kv that has the full array/json values.
                itr.remove();
                break;
              }
            }
          }
        }
        byte[] value = kvSchema.toBytes(tuple, serverParsedFuncRefs, kvSchemaBitSet, ptr);
        // Add a dummy kv with the exact value of the array index or json value
        result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(),
          QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0,
          QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length,
          QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0,
          QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
          KeyValue.Type.codeToType(rowKv.getType().getCode()), value, 0, value.length));
        return getArrayCellPosition(result);
      }

      @Override
      public long getMaxResultSize() {
        return s.getMaxResultSize();
      }

      @Override
      public int getBatch() {
        return s.getBatch();
      }
    };
  }

  // PHOENIX-4791 Share position of array element cell
  public static int getArrayCellPosition(List<Cell> result) {
    return result.size() - 1;
  }
}
